本文来看看Apache Curator客户端的使用吧!
一、前言
对于上一章中应用的java 原生API来操作节点。来看看他的不足:
超时重连,不支持自动,需要手动操作
watcher注册一次后会失效
不支持递归创建节点
对于Apache Curator开源客户端,具有以下的优点:
Apache的开源项目,值得信赖
解决watcher的注册一次就失效的问题
API更加简单易用
提供更多解决方案并且实现简单,比如分布式锁
提供常用的zookeeper工具类
编程风格更爽
本篇文章为上半部分,主要学习了一下自动重连、创建节点、查询节点数据和子节点、删除和修改节点数据。还有就是用nodeCache以及PathChildrenCache缓存节点数据来解决注册一次就失效的问题。
二、使用
首先新建一个maven工程,我这里直接新建了一个SpringBoot工程,依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-framework</artifactId > <version > 4.0.0</version > </dependency > <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-recipes</artifactId > <version > 4.0.0</version > </dependency > <dependency > <groupId > org.apache.zookeeper</groupId > <artifactId > zookeeper</artifactId > <version > 3.4.11</version > </dependency >
三、连接&自动重连
配置完依赖后,我们就可以来写一个简单的demo测试与zookeeper服务端的连接。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class CuratorConnect { public CuratorFramework client = null ; private static final String zkServerPath = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183" ; public CuratorConnect () { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000 ,5 ); client = CuratorFrameworkFactory.builder() .connectString(zkServerPath) .sessionTimeoutMs(10 *1000 ) .retryPolicy(retryPolicy) .build(); client.start(); } private void closeZKClient () { if (client != null ){ client.close(); } } public static void main (String[] args) { CuratorConnect curatorConnect = new CuratorConnect(); boolean isZkClientStart = curatorConnect.client.isStarted(); System.out.println("客户端是否打开:" +isZkClientStart); curatorConnect.closeZKClient(); isZkClientStart = curatorConnect.client.isStarted(); System.out.println("客户端是否打开:" +isZkClientStart); } }
curator连接zookeeper服务器时有自动重连机制,而curator的重连策略有五种。
1 2 3 4 5 6 7 8 9 10 11 12 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000 , 5 );
1 2 3 4 5 6 7 8 9 RetryPolicy retryPolicy = new RetryNTimes(3 , 5000 );
1 2 3 4 5 6 7 8 9 RetryPolicy retryPolicy2 = new RetryOneTime(3000 );
1 2 3 4 RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
1 2 3 4 5 6 7 8 9 RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000 , 3000 );
四、zk命名空间以及创建节点
zookeeper的命名空间就类似于我们平时使用Eclipse等开发工具的工作空间一样,我们该连接中所有的操作都是基于这个命名空间的。curator提供了设置命名空间的方法,这样我们任何的连接都可以去设置一个命名空间。设置了命名空间并成功连接后,zookeeper的根节点会多出一个以命名空间名称所命名的节点。然后我们在该连接的增删查改等操作都会在这个节点中进行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class CuratorCreateNode { public CuratorFramework client = null ; private static final String zkServerIps = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183" ; public CuratorCreateNode () { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000 , 5 ); client = CuratorFrameworkFactory.builder() .connectString(zkServerIps) .sessionTimeoutMs(10000 ).retryPolicy(retryPolicy) .namespace("workspace" ).build(); client.start(); } private void closeZKClient () { if (client != null ) { this .client.close(); } } public static void main (String[] args) throws Exception { CuratorCreateNode curatorConnect = new CuratorCreateNode(); boolean isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); String nodePath = "/super/testNode" ; byte [] data = "this is a test data" .getBytes(); String result = curatorConnect.client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(nodePath, data); System.out.println(result + "节点,创建成功..." ); Thread.sleep(1000 ); curatorConnect.closeZKClient(); isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); } }
运行该类,控制台输出信息如下:
1 2 3 当前客户端的状态:连接中... /super/testNode节点,创建成功... 当前客户端的状态:已关闭...
五、修改节点以及删除节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public class CuratorConnect { public CuratorFramework client = null ; private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181" ; public CuratorConnect () { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000 , 5 ); client = CuratorFrameworkFactory.builder() .connectString(zkServerIps) .sessionTimeoutMs(10000 ).retryPolicy(retryPolicy) .namespace("workspace" ).build(); client.start(); } private void closeZKClient () { if (client != null ) { this .client.close(); } } public static void main (String[] args) throws Exception { CuratorConnect curatorConnect = new CuratorConnect(); boolean isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); String nodePath = "/super/testNode" ; byte [] newData = "this is a new data" .getBytes(); Stat resultStat = curatorConnect.client.setData().withVersion(0 ) .forPath(nodePath, newData); System.out.println("更新节点数据成功,新的数据版本为:" + resultStat.getVersion()); curatorConnect.client.delete() .guaranteed() .deletingChildrenIfNeeded() .withVersion(resultStat.getVersion()) .forPath(nodePath); Thread.sleep(1000 ); curatorConnect.closeZKClient(); isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); } }
六、查询节点相关信息
1.获取某个节点的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 ... public class CuratorConnect { ... public static void main (String[] args) throws Exception { CuratorConnect curatorConnect = new CuratorConnect(); boolean isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); String nodePath = "/super/testNode" ; Stat stat = new Stat(); byte [] nodeData = curatorConnect.client.getData().storingStatIn(stat).forPath(nodePath); System.out.println("节点 " + nodePath + " 的数据为:" + new String(nodeData)); System.out.println("该节点的数据版本号为:" + stat.getVersion()); Thread.sleep(1000 ); curatorConnect.closeZKClient(); isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); } }
2.获取某个节点下的子节点列表,现有一个节点的子节点列表如下:
1 2 3 [zk: localhost:2181(CONNECTED) 33] ls /workspace/super/testNode [threeNode, twoNode, oneNode] [zk: localhost:2181(CONNECTED) 34]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 ... public class CuratorConnect { ... public static void main (String[] args) throws Exception { CuratorConnect curatorConnect = new CuratorConnect(); boolean isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); String nodePath = "/super/testNode" ; List<String> childNodes = curatorConnect.client.getChildren().forPath(nodePath); System.out.println(nodePath + " 节点下的子节点列表:" ); for (String childNode : childNodes) { System.out.println(childNode); } Thread.sleep(1000 ); curatorConnect.closeZKClient(); isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); } }
3.查询某个节点是否存在
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 ... public class CuratorConnect { ... public static void main (String[] args) throws Exception { CuratorConnect curatorConnect = new CuratorConnect(); boolean isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); String nodePath = "/super/testNode" ; Stat statExist = curatorConnect.client.checkExists().forPath(nodePath); if (statExist == null ) { System.out.println(nodePath + " 节点不存在" ); } else { System.out.println(nodePath + " 节点存在" ); } Thread.sleep(1000 ); curatorConnect.closeZKClient(); isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); } }
七、curator之usingWatcher
curator在注册watch事件上,提供了一个usingWatcher方法,使用这个方法注册的watch事件和默认watch事件一样,监听只会触发一次,监听完毕后就会销毁,也就是一次性的。而这个方法有两种参数可选,一个是zk原生API的Watcher接口的实现类,另一个是Curator提供的CuratorWatcher接口的实现类,不过在usingWatcher方法上使用哪一个效果都是一样的,都是一次性的。
新建一个 MyWatcher 实现类,实现 Watcher 接口。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class MyWatcher implements Watcher { public void process (WatchedEvent watchedEvent) { System.out.println("触发watcher,节点路径为:" + watchedEvent.getPath()); } }
新建一个 MyCuratorWatcher 实现类,实现 CuratorWatcher 接口。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class MyCuratorWatcher implements CuratorWatcher { public void process (WatchedEvent watchedEvent) throws Exception { System.out.println("触发watcher,节点路径为:" + watchedEvent.getPath()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class CuratorConnect { ... public static void main (String[] args) throws Exception { CuratorConnect curatorConnect = new CuratorConnect(); boolean isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); String nodePath = "/super/testNode" ; curatorConnect.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath); Thread.sleep(100000 ); curatorConnect.closeZKClient(); isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); } }
运行该类,然后到zookeeper服务器上修改/super/testNode节点的数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 [zk: localhost:2181 (CONNECTED) 35 ] set /workspace/super /testNode new -data cZxid = 0xb00000015 ctime = Sat Apr 28 20 :59 :57 CST 2018 mZxid = 0xb0000002b mtime = Sat Apr 28 21 :40 :58 CST 2018 pZxid = 0xb0000001c cversion = 3 dataVersion = 2 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 8 numChildren = 3 [zk: localhost:2181 (CONNECTED) 36 ]
修改完成后,此时控制台输出内容如下,因为workspace是命名空间节点,所以不会被打印出来:
1 触发watcher,节点路径为:/super/testNode
八、curator之nodeCache一次注册N次监听
想要实现watch一次注册n次监听的话,我们需要使用到curator里的一个NodeCache对象。这个对象可以用来缓存节点数据,并且可以给节点添加nodeChange事件,当节点的数据发生变化就会触发这个事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 ... public class CuratorConnect { ... public static void main (String[] args) throws Exception { CuratorConnect curatorConnect = new CuratorConnect(); boolean isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); String nodePath = "/super/testNode" ; final NodeCache nodeCache = new NodeCache(curatorConnect.client, nodePath); nodeCache.start(true ); if (nodeCache.getCurrentData() != null ) { System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData())); } else { System.out.println("节点初始化数据为空..." ); } nodeCache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged () throws Exception { if (nodeCache.getCurrentData() == null ) { System.out.println("获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除" ); return ; } String data = new String(nodeCache.getCurrentData().getData()); System.out.println(nodeCache.getCurrentData().getPath() + " 节点的数据发生变化,最新的数据为:" + data); } }); Thread.sleep(200000 ); curatorConnect.closeZKClient(); isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); } }
运行该类后,我们到zookeeper服务器上,对/super/testNode节点进行如下操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 [zk: localhost:2181 (CONNECTED) 2 ] set /workspace/super /testNode change-data cZxid = 0xb00000015 ctime = Sat Apr 28 20 :59 :57 CST 2018 mZxid = 0xb00000037 mtime = Sat Apr 28 23 :49 :42 CST 2018 pZxid = 0xb0000001c cversion = 3 dataVersion = 6 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 11 numChildren = 3 [zk: localhost:2181 (CONNECTED) 3 ] set /workspace/super /testNode change-agin-data cZxid = 0xb00000015 ctime = Sat Apr 28 20 :59 :57 CST 2018 mZxid = 0xb00000038 mtime = Sat Apr 28 23 :50 :01 CST 2018 pZxid = 0xb0000001c cversion = 3 dataVersion = 7 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 16 numChildren = 3 [zk: localhost:2181 (CONNECTED) 8 ] delete /workspace/super /testNode [zk: localhost:2181 (CONNECTED) 9 ] create /workspace/super /testNode test-data Created /workspace/super /testNode [zk: localhost:2181 (CONNECTED) 10 ]
此时控制台输出内容如下:
1 2 3 4 5 6 7 当前客户端的状态:连接中... 节点初始化数据为:new-data /super/testNode 节点的数据发生变化,最新的数据为:change-data /super/testNode 节点的数据发生变化,最新的数据为:change-agin-data 获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除 /super/testNode 节点的数据发生变化,最新的数据为:test-data 当前客户端的状态:已关闭...
从控制台输出的内容可以看到,只要数据发生改变了都会触发这个事件,并且是可以重复触发的,而不是一次性的。
九、curator之PathChildrenCache子节点监听
使用NodeCache虽然能实现一次注册n次监听,但是却只能监听一个nodeChanged事件,也就是说创建、删除以及子节点的事件都无法监听。如果我们要监听某一个节点的子节点的事件,或者监听某一个特定节点的增删改事件都需要借助PathChildrenCache来实现。从名称上可以看到,PathChildrenCache也是用缓存实现的,并且也是一次注册n次监听。当我们传递一个节点路径时是监听该节点下的子节点事件,如果我们要限制监听某一个节点,只需要加上判断条件即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 ... public class CuratorConnect { ... public static void main (String[] args) throws Exception { CuratorConnect curatorConnect = new CuratorConnect(); boolean isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); String nodePath = "/super/testNode" ; final PathChildrenCache childrenCache = new PathChildrenCache(curatorConnect.client, nodePath, true ); childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); List<ChildData> childDataList = childrenCache.getCurrentData(); System.out.println("当前节点的子节点详细数据列表:" ); for (ChildData childData : childDataList) { System.out.println("\t* 子节点路径:" + new String(childData.getPath()) + ",该节点的数据为:" + new String(childData.getData())); } childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent (CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception { if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { System.out.println("\n--------------\n" ); System.out.println("子节点初始化成功" ); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { System.out.println("\n--------------\n" ); System.out.print("子节点:" + event.getData().getPath() + " 添加成功," ); System.out.println("该子节点的数据为:" + new String(event.getData().getData())); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { System.out.println("\n--------------\n" ); System.out.println("子节点:" + event.getData().getPath() + " 删除成功" ); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { System.out.println("\n--------------\n" ); System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功," ); System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData())); } } }); Thread.sleep(200000 ); curatorConnect.closeZKClient(); isZkCuratorStarted = curatorConnect.client.isStarted(); System.out.println("当前客户端的状态:" + (isZkCuratorStarted ? "连接中..." : "已关闭..." )); } }